package com.uxsino.simo.collector.connections;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;

import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.auth.AuthDescriptor;
import net.spy.memcached.auth.PlainCallbackHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;
import com.uxsino.simo.connections.AbstractConnection;
import com.uxsino.simo.connections.target.TCPTarget;

public class MemcachedConnection extends AbstractConnection<TCPTarget> {
    static Logger logger = LoggerFactory.getLogger(MemcachedConnection.class);

    private MemcachedClient mclient;

    @Override
    public int connect(TCPTarget target) {
        super.connect(target);
        if (target.getUsername() == null || target.getUsername().length() == 0) {
            connectWithoutAuth(target);
        } else {
            connectWithAuth(target);
        }
        connected = true;
        state = 1;
        return state;
    }

    @Override
    public Object execCmd(Object cmdPattern) {
        JSONObject result = new JSONObject();

        // mclient.getStats() has only one entry for single node if connection, if not connected, 0 value
        for (Map<String, String> valueMap : mclient.getStats().values()) {
            for (Map.Entry<String, String> entry : valueMap.entrySet()) {
                result.put(entry.getKey(), entry.getValue());
            }
        }
        // result.put("transcoder_max_size", mclient.getTranscoder().getMaxSize());
        // System.out.println("result: " + result);
        return result;
    }

    @Override
    public Object buildCmd(String cmdPattern, Map<String, String> args) {
        return cmdPattern;
    }

    @Override
    public int close() {
        mclient.shutdown();
        connected = false;
        super.close();
        return 0;
    }

    private void connectWithoutAuth(TCPTarget target) {
        logger.info("start connecting to server without authentication");
        try {
            mclient = new MemcachedClient(
                    new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY).build(),
                    AddrUtil.getAddresses(target.host + ":" + target.port));
            connected = true;
        } catch (IOException e) {
            logger.error("connect without auth error: {}" + e.getMessage());
            connected = false;
        }
    }

    private void connectWithAuth(TCPTarget target) {
        logger.info("start connecting to server with authentication");
        try {
            AuthDescriptor auth = new AuthDescriptor(new String[] {},
                new PlainCallbackHandler(target.getUsername(), target.getPassword()));
            mclient = new MemcachedClient(
                new ConnectionFactoryBuilder().setAuthDescriptor(auth)
                    .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY).build(),
                AddrUtil.getAddresses(target.host + ":" + target.port));
            connected = true;
        } catch (IOException e) {
            logger.error("connect with auth error: {}" + e.getMessage());
            connected = false;
        }
    }

    @Override
    public boolean testWithConnected(String cmdString, String resStart) {
        JSONObject cmdRes = (JSONObject) execCmd("");
        if (cmdRes.isEmpty()) {
            connected = false;
        }
        logger.info("connection test result:{}", connected);
        return connected;
    }

}
